package com.ctjsoft.datacollection.core.quartz;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.extra.mail.MailAccount;
import cn.hutool.extra.mail.MailUtil;
import com.alibaba.fastjson.JSON;
import com.ctjsoft.datacollection.core.constant.KettleConfig;
import com.ctjsoft.datacollection.core.enums.RunResultEnum;
import com.ctjsoft.datacollection.core.enums.RunTypeEnum;
import com.ctjsoft.datacollection.core.exceptions.MyMessageException;
import com.ctjsoft.datacollection.core.execute.JobExecute;
import com.ctjsoft.datacollection.core.execute.TransExecute;
import com.ctjsoft.datacollection.core.repository.RepositoryUtil;
import com.ctjsoft.datacollection.entity.*;
import com.ctjsoft.datacollection.mapper.KRepositoryMapper;
import com.ctjsoft.datacollection.service.KQuartzService;
import com.ctjsoft.datacollection.service.KScriptMonitorService;
import com.ctjsoft.datacollection.service.KScriptRecordService;
import com.ctjsoft.datacollection.service.KScriptService;
import com.ctjsoft.datacollection.util.*;
import lombok.extern.slf4j.Slf4j;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.repository.AbstractRepository;
import org.quartz.*;

import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * 作业定时任务执行器
 * 因为定时器的job类和kettle的job类名一样，因此这里采用继承{@code org.quartz。InterruptableJob}类
 *
 * @author lyf
 */
@Slf4j
@DisallowConcurrentExecution
public class ScriptQuartz implements InterruptableJob {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        // 此处无法使用常规注入方式注入bean
        KScriptMonitorService monitorService = SpringContextUtil.getBean(KScriptMonitorService.class);
        KScriptService scriptService = SpringContextUtil.getBean(KScriptService.class);
        KScriptRecordService scriptRecordService = SpringContextUtil.getBean(KScriptRecordService.class);
        KQuartzService quartzService = SpringContextUtil.getBean(KQuartzService.class);
        // 本次执行时间
        Date lastExecuteTime = jobExecutionContext.getFireTime();
        // 下一次任务时间
        Date nexExecuteTime = jobExecutionContext.getNextFireTime();
        // 运行状态
        boolean runStatus = true;
        // 获取传入过来的作业ID
        String scriptId = jobExecutionContext.getMergedJobDataMap().getString("id");

        KScript script = scriptService.getById(scriptId);
        // 设置执行参数
        Map<String, String> params = new HashMap<>(2);
        if (StringUtil.hasText(script.getSyncStrategy())) {
            Integer day = Integer.valueOf(script.getSyncStrategy().substring(2, script.getSyncStrategy().length()));

            params.put("start_time", DateUtil.getDateTimeStr(DateUtil.addDays(DateUtil.getTodayStartTime(), -day)));
            params.put("end_time", DateUtil.getDateTimeStr(DateUtil.addDays(DateUtil.getTodayEndTime(), -day)));
        }
        /**
         * 感谢gitee网友booleandev解决job传参问题
         * gitee主页：https://gitee.com/yanjiantao
         */
        // 执行参数加入到 job 中
        String scriptParams = script.getScriptParams();
        Map jsonToMap = JSON.parseObject(scriptParams);
        params.putAll(jsonToMap);
        // 执行作业并返回日志
        String logText = "";
        try {
            // 判断是执行资源库还是执行文件脚本
            switch (RunTypeEnum.getEnum(script.getExecuteType())) {
                case REP:
                    if (script.getScriptType() == "0" || script.getScriptType().equals("0")) {
                        logText = JobExecute.run(getAbstractRepository(script.getScriptRepositoryId())
                                , script.getScriptPath(), script.getScriptName()
                                , null, params
                                , LogLevel.getLogLevelForCode(script.getScriptLogLevel()));
                    } else {
                        logText = TransExecute.run(getAbstractRepository(script.getScriptRepositoryId())
                                , script.getScriptPath(), script.getScriptName()
                                , null, params
                                , LogLevel.getLogLevelForCode(script.getScriptLogLevel()));
                    }
                    break;
                case FILE:
                    if (script.getScriptType() == "0" || script.getScriptType().equals("0")) {
                        logText = JobExecute.run(script.getScriptPath(), null
                                , LogLevel.getLogLevelForCode(script.getScriptLogLevel()));
                    } else {
                        logText = TransExecute.run(script.getScriptPath(), null
                                , LogLevel.getLogLevelForCode(script.getScriptLogLevel()));
                    }
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + RunTypeEnum.getEnum(script.getExecuteType()));
            }
        } catch (KettleException e) {
            runStatus = false;
            String msg = "执行失败";
            log.error(msg, e);
            logText = e.getMessage();
            MailUtils mailUtils = SpringContextUtil.getBean(MailUtils.class);
            if(mailUtils.isStart()){
                MailAccount account = mailUtils.createSendConfig();
                String mailMessage = "采集任务执行失败，任务名：" + script.getScriptName() + ",错误信息：" + logText + ",失败时间：" + DateUtil.currentDateTimeStr();
                log.info("邮件收件人："+mailUtils.getRecipient()+"\n邮件内容："+mailMessage);
                String send = MailUtil.send(account, CollUtil.newArrayList(mailUtils.getRecipient()), "数据采集平台-任务失败通知", mailMessage, false);
            }
        }
        // 执行结束时间
        Date stopDate = new Date();

        // 输出日志到文件中,返回输出路径
        String logPath = writeStringToFile(String.valueOf(scriptId), logText);
        //获取定时任务状态
        KQuartz quartz = quartzService.getById(script.getScriptQuartz());
        //通过下一次执行时间判断是不是一次性任务，如果是一次性任务，执行结束后，修改任务状态
        if (nexExecuteTime == null && quartz.getQuartzCron() == null) {
            script.setScriptStatus("2");
            scriptService.updateById(script);
        }
        // 修改监控表数据
        KScriptMonitor scriptMonitor = new KScriptMonitor();
        scriptMonitor.setMonitorScriptId(scriptId);
        scriptMonitor.setLastExecuteTime(lastExecuteTime);
        scriptMonitor.setNextExecuteTime(nexExecuteTime);
        monitorService.updateMonitor(scriptMonitor, runStatus);

        // 添加作业执行记录
        KScriptRecord scriptRecord = new KScriptRecord();
        scriptRecord.setLogFilePath(logPath);
        scriptRecord.setRecordStatus(runStatus ? RunResultEnum.SUCCESS.getCode() : RunResultEnum.FAIL.getCode());
        scriptRecord.setRecordScriptId(scriptId);
        scriptRecord.setStartTime(lastExecuteTime);
        scriptRecord.setStopTime(stopDate);
        scriptRecordService.save(scriptRecord);
    }

    /**
     * 获取资源库
     *
     * @param transRepositoryId 资源库id
     * @return {@link AbstractRepository}
     */
    private AbstractRepository getAbstractRepository(String transRepositoryId) {
        AbstractRepository repository = RepositoryUtil.getRepository(transRepositoryId);
        if (repository == null) {
            KRepositoryMapper repositoryMapper = SpringContextUtil.getBean(KRepositoryMapper.class);
            KRepository kRepository = repositoryMapper.selectById(transRepositoryId);
            //Optional<Repository> optionalRepository = repRepository.findById(transRepositoryId);
            if (kRepository == null) {
                throw new MyMessageException("资源库不存在");
            }
            // 连接资源库
            repository = RepositoryUtil.connection(kRepository);
        }
        return repository;
    }

    /**
     * 输出日志到文件
     *
     * @param jobId   作业ID
     * @param logText 日志内容
     * @return 日志输出路径
     */
    private String writeStringToFile(String jobId, String logText) {
        String logPath = KettleConfig.logFilePath.concat("/").concat("job/").concat(jobId).concat("/").concat(String.valueOf(System.currentTimeMillis())).concat(".txt");
        try {
            FileUtil.writeStringToFile(new File(logPath), logText, KettleConfig.encoding.name(), false);
        } catch (IOException e) {
            String msg = "输出日志到文件失败";
            log.error(msg, e);
        }
        return logPath;
    }

    @Override
    public void interrupt() throws UnableToInterruptJobException {

    }
}
